Implement S3VersionStore::store_version_from_reader#398
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughChange VersionStore's reader API to take an owned boxed async reader; implement S3 store to stream an owned reader into S3 PutObject via a custom http_body::Body adapter; add AWS-related dependency pins and an OxenError variant for AWS SDK errors; update call sites and local store accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant S3Store as S3VersionStore
participant Reader
participant S3Client
participant S3Service as AWS S3
Caller->>S3Store: store_version_from_reader(hash, reader: Box<AsyncRead>)
S3Store->>S3Client: init client
S3Store->>Reader: wrap Reader in ReaderStream -> ReaderBody
S3Store->>S3Client: put_object(key, body=ReaderBody)
S3Client->>S3Service: HTTP PUT with streaming body
S3Service-->>S3Client: 200 OK
S3Client-->>S3Store: response
S3Store-->>Caller: Result::Ok or OxenError
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
crates/lib/src/error.rs (1)
150-151: Preserve the wrapped AWS error as the source.Line 151 stores the SDK error, but
thiserroronly wiresError::source()from fields marked#[source], namedsource, or annotated with#[from]. As written, callers will see the display message but lose the underlying AWS error chain. (docs.rs)♻️ Likely fix
- AwsSdkError(Box<dyn std::error::Error + Send + Sync>), + AwsSdkError(#[source] Box<dyn std::error::Error + Send + Sync>),🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@crates/lib/src/error.rs` around lines 150 - 151, The AwsSdkError enum variant currently wraps the AWS error but doesn't expose it as the source for Error::source(); update the AwsSdkError variant in the error enum (the AwsSdkError(...) variant in error.rs) to mark the inner boxed error as the source (either by adding the #[source] attribute to the variant or by renaming the inner field to source) so thiserror will wire through the underlying AWS error chain; keep the existing #[error("AWS SDK error: {0}")] display message while adding the source annotation to preserve error chaining.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/lib/src/storage/s3.rs`:
- Line 186: The multipart upload currently hardcodes PART_SIZE = 8 * 1024 * 1024
and increments part_num in the upload loop; add a guard to prevent exceeding
S3's 10,000-part limit by either (a) checking the reader/content length up front
(if available) and computing a larger PART_SIZE so total_parts <= 10_000, or (b)
if length is unknown, detect in the upload loop (where part_num is incremented)
and return a clear error when part_num > 10_000. Update the multipart upload
function that defines PART_SIZE and uses part_num to perform this validation and
return an error early instead of continuing to stream.
- Around line 238-336: The spawned upload_task (JoinHandle named upload_task)
can continue issuing upload_part calls after the producer errors and before
abort_multipart_upload runs; ensure the worker is explicitly cancelled and
awaited before calling abort_multipart_upload: on any early return/error in the
producer/result block, call upload_task.abort() (or send a cancellation signal
via tx/closure), then await upload_task (handle the JoinError) to guarantee all
in-flight part uploads stop/complete before invoking
client.abort_multipart_upload(); update the error paths around the producer loop
and the Result handling where upload_task is dropped so upload_task.abort() and
upload_task.await are executed prior to calling abort_multipart_upload.
---
Nitpick comments:
In `@crates/lib/src/error.rs`:
- Around line 150-151: The AwsSdkError enum variant currently wraps the AWS
error but doesn't expose it as the source for Error::source(); update the
AwsSdkError variant in the error enum (the AwsSdkError(...) variant in error.rs)
to mark the inner boxed error as the source (either by adding the #[source]
attribute to the variant or by renaming the inner field to source) so thiserror
will wire through the underlying AWS error chain; keep the existing #[error("AWS
SDK error: {0}")] display message while adding the source annotation to preserve
error chaining.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 3b22fdac-fc5c-48a3-8b7b-9093413ee5d9
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (4)
.claude/CLAUDE.mdCargo.tomlcrates/lib/src/error.rscrates/lib/src/storage/s3.rs
…producer task errors out--otherwise we may attempt to begin or continue uploads after sending abort_multipart_upload, which would be weird
malcolmgreaves
left a comment
There was a problem hiding this comment.
Overall a really great PR! This is very clean and easy to follow along, which is wonderful -- the codebase needs more PRs like this one!
I have two major comments on this PR. The first is about making the part_size and other runtime options of the upload configurable. This is so that we can have flexibility to change at runtime as well as the ability to handle files larger than ~80GB, which is a limitation of this implementation.
The second comment is about OxenError. There's two sub-comments to this one. First is a request to use structured error variants instead of the OxenError::Basic variant for the different error conditions introduced here. The second one is to use a more specific error type for the AWS sdk error. The dyn trait object is too wide.
Both of my comments have some suggested implementations. Happy to discuss through those further. Ultimately, I'm more looking for the end state and don't favor any approach over the other.
…ror that didn't originate from the aws sdk
…t multipart upload shenanigans
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@crates/lib/src/storage/s3.rs`:
- Line 185: The build fails because ByteStream::from_body_1_x (used in
storage::s3.rs) requires the aws-sdk-s3 crate to enable the http-body-1-x
feature; update Cargo.toml to change the aws-sdk-s3 dependency to include
features = ["http-body-1-x"] (e.g. aws-sdk-s3 = { version = "1.127.0", features
= ["http-body-1-x"] }) and then run cargo build/cargo update to ensure the
feature is picked up.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 8ab1d5db-06b2-493d-beba-f9ad15c097eb
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
Cargo.tomlcrates/lib/Cargo.tomlcrates/lib/src/api/client/versions.rscrates/lib/src/core/v_latest/workspaces/commit.rscrates/lib/src/storage/local.rscrates/lib/src/storage/s3.rscrates/lib/src/storage/version_store.rscrates/server/src/controllers/commits.rscrates/server/src/controllers/versions.rs
✅ Files skipped from review due to trivial changes (1)
- crates/server/src/controllers/commits.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/lib/Cargo.toml
- Cargo.toml
…3 without multipart upload shenanigans" because it fails at runtime without a size This reverts commit 7690d04.
…ge in the direct streaming attempt
…t; use file size to determine part size for multipart uploads for files > 100MB
| async fn unpack_entry_tarball_async( | ||
| repo: &LocalRepository, | ||
| compressed_data: &[u8], | ||
| compressed_data: Vec<u8>, |
There was a problem hiding this comment.
(minor comment) Do we need to consume this compressed_data? Or would a borrow on it still suffice for creating let reader = Cursor::new(compressed_data);? I feel like we can keep this one as-is, but perhaps I'm missing something?
There was a problem hiding this comment.
A &[u8] won't work here because store_version_from_reader on line 1270 now requires owned data, and file derives it's lifetime from compressed_data. I'm not sure exactly why the lifetime of the &[u8] is not deemed sufficient, but I can't find a way to get the compiler to allow it. Lucky for us, the callers didn't need the data after calling this, so it was an easy change.
malcolmgreaves
left a comment
There was a problem hiding this comment.
LGTM! 👏 Minor non-blocking comments.
Co-authored-by: Malcolm Greaves <malcolmgreaves@users.noreply.github.com>
I'm tackling the S3 implementation one method at a time, and there's many more methods to go, so it will be a bit until we get into a state where we can actually try it out. But I have unit tests in a separate PR (since they required some hefty dependency updates).
OxenError::AwsS3Errorvariant.